package com.atuguigu.flink.Day07;

import com.atuguigu.flink.Day01.Singlesensor.SensorSource;
import com.atuguigu.flink.sensor.SendsorReading;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;

import java.util.ArrayList;
import java.util.HashMap;

public class Example6 {
    // 写入es
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<SendsorReading> stream = env.addSource(new SensorSource());

        ArrayList<HttpHost> httpHosts = new ArrayList<>();
        httpHosts.add(new HttpHost("hadoop104",9200,"http"));
        ElasticsearchSink.Builder<SendsorReading> esBuilder = new ElasticsearchSink.Builder<>(
                httpHosts,
                new ElasticsearchSinkFunction<SendsorReading>() {
                    @Override
                    public void process(SendsorReading sendsorReading, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {
                        HashMap<String, String> data = new HashMap<>();
                        data.put(sendsorReading.id,sendsorReading.temperture +"");
                        IndexRequest indexRequest = Requests
                                .indexRequest()
                                .index("sensor-reading")
                                .type("indexRequest")
                                .source(data);
                        requestIndexer.add(indexRequest);

                    }
                }
        );


        esBuilder.setBulkFlushBackoffRetries(1);//一次xingxieru
        stream.addSink(esBuilder.build());


        env.execute();

    }
}
